iT邦幫忙

2021 iThome 鐵人賽

DAY 21
1

Flow 是 cold stream,只有在呼叫 Terminal operator 的時候才會執行。也就是說每一個 Flow 都只有一次性的工作,只要呼叫一次 Terminal operator 就會完成這一次的呼叫。例如我們的 Terminal operator 選擇 collector ,那麼當某個 Flow 執行 collector 之後,就會把 Flow 裏面所有的資料根據我們所設定的動作來執行,最後經過 collector 把結果儲存下來,然後就結束這一次的任務。

Flow 也有可以支援 hot stream 的方式,它的名稱為 SharedFlow,它可以支援多個 Collector 共享 Flow 的發射的內容 (emitted value),所以對於所有的 Collector,只有一次的執行。

那麼,SharedFlow 是如何共享所有發射的內容呢?它是使用廣播(broadcast)的方式。因為 SharedFlow 永遠都不會結束等著廣播發送內容給所有的 Collector,所以稱為 hot stream。

SharedFlow 的介面 interface,如下:


interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
}

由上我們得知 SharedFlow 是繼承 Flow 的,其中裏面包含一個函式 replayCache:List<T> ,當有一個新的 collector 加入時,就會根據設定的 replay 數量來把最後的項目廣播給新的 collector 上。

shareIn

我們可以使用 shareIn 讓原本的 Flow 轉變成 SharedFlow

class Day21 {
    val scope = CoroutineScope(Job())
    fun sharedFlow(): Flow<Int> = flow {
        println("Flow started")
        repeat(10) {
            delay(100)
            emit(it)
        }
    }.shareIn(
        scope,
        replay = 10,
        started = SharingStarted.WhileSubscribed()
    )
}

在上方中,我們在原本的 flow 底下使用 shareIn 來讓這個 flow 轉變成 sharedFlow,其中需要帶三個參數,第一個是帶入一個 CoroutineScope,也就是這個 Flow 所在的 Scope,第二個則是當有新的 collector 加入時,需要重播幾項,最後一個參數 started 則是什麼時候開始啟動。

  • SharingStarted.WhileSubscribed():表示當第一個 collector 出現的時候就啟動,最後一個 collector 消失的時候就立即停止,保留重播的快取。

執行:

@OptIn(InternalCoroutinesApi::class)

fun main() = runBlocking {
    val day21 = Day21()
    val sharedFlow = day21.sharedFlow()

    launch {
        sharedFlow.collect {
            println("(1): $it")
        }
    }
    delay(500)
    launch {
        sharedFlow.collect {
            println("(2): $it")
        }
    }

    println("done")
}

外層的 coroutine 會先執行 500 毫秒,第一個 launch 執行 500 毫秒時外層 coroutine 會結束 delay,接著第二個 launch 也會跟著執行,但是因為第一個 launch 已經在 500 毫秒內接收了一堆內容,所以這時候 sharedFlow 就要把那些內容發給第二個 launch,等到發完之後,接下來的每一筆資料都會同時傳給兩個 launch。

結果如下:

Flow started
(1): 0
(1): 1
(1): 2
(1): 3
done
(2): 0
(2): 1
(2): 2
(2): 3
(1): 4
(2): 4
(1): 5
(2): 5
(1): 6
(2): 6
(1): 7
(2): 7
(1): 8
(2): 8
(1): 9
(2): 9

雖然我們的 flow 只有發送 10 個值,但是 sharedFlow 不會因為我們發完之後就停了,它會一直處於執行的狀態,除非所有的 collector 都消失。(在這邊的消失可能是取消或是發生 exception)

另外,這邊的 collector 因為都是在執行之後就開始接收內容,所以 collector 在 sharedFlow 就稱為 subscriber (訂閱者)。

--- 接下篇 ---


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day20:Flow 想在其他的執行緒執行,可以嗎?
下一篇
Day22:Hot Flow - SharedFlow (Part II)
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言